package com.siyoumi.app.test.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.siyoumi.app.rabbitmq.RabbitMqMessage;
import com.siyoumi.app.rabbitmq.RabbitMqProperties;
import com.siyoumi.app.rabbitmq.RabbitMqPublishAbs;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;

//延迟队列 发送消息
@Slf4j
public class RabbitMqPublishDelay
        extends RabbitMqPublishAbs {
    @Override
    protected String getExchangeName() {
        return "delay_exchange";
    }

    @Override
    protected String getQueueName() {
        return "delay_queue";
    }

    static public RabbitMqPublishAbs getIns() {
        RabbitMqPublishAbs mq = new RabbitMqPublishDelay();
        mq.init();
        return mq;
    }

    @SneakyThrows
    public void init() {
        super.init();

        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //args.put("x-message-ttl", 5000); //消息5秒失效

        //创建交换机
        getChannel().exchangeDeclare(getExchangeName(), "x-delayed-message", true, false, args);
        //创建队列
        getChannel().queueDeclare(getQueueName(), true, false, false, null);

        getChannel().queueBind(getQueueName(), getExchangeName(), getQueueName());
    }

    @Override
    @SneakyThrows
    public void publishMessage(RabbitMqMessage message) {
        int delayMs = message.getDelaySecond() * 1_000;

        Map<String, Object> headers = new HashMap<>();
        headers.put("x-delay", delayMs);
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .headers(headers)
                .deliveryMode(2) //消息持久化
                //.expiration(delayMs + "") //设置消息存活时间（毫秒）
                .build();

        getChannel().basicPublish(getExchangeName(), getQueueName(), properties, message.getContent().getBytes());
    }
}